package com.atguigu.flink.checkpoint;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.time.Duration;

/**
 * Created by Smexy on 2023/11/15

    StateBackend:  状态后端。 负责管理(读，写，备份，恢复)状态

    Checkpoint:  负责将备份的状态进行存储。决定状态存在哪里。
            默认Checkpoint Storage	JobManagerCheckpointStorage(不可靠)。
                把状态每间隔2s备份到 JobManager进程的堆内存中。

    为了保证状态的安全，可以把状态备份到外部的文件系统中。这样JM进程崩溃了，状态备份也不会丢。重启App的时候，可以从上次备份的位置，接着计算。

    ----------------------------------
        默认设置的就是 barrier对齐的 eos

    ---------------------------
        设置为barrier对齐的 at least once :
            env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE);

   --------------------------
        设置barrier不对齐。 只有EOS
            Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE
            env.enableCheckpointing(2000);
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.enableUnalignedCheckpoints(true);
 */
public class Demo1_Checkpoint
{
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       // env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE);
        env.enableCheckpointing(2000);

        //备份设置
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.enableUnalignedCheckpoints(true);

        //设置ck的超时时间为2min。超时的ck即使完成，也不会使用
        checkpointConfig.setCheckpointTimeout(120000);
        //设置两次ck之间，停顿的最小时间
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        //设置最大并行checkpoint的数量。 非对齐模式下，这个值必须强制为1 maxConcurrentCheckpoints can't be > 1 if UnalignedCheckpoints enabled
        checkpointConfig.setMaxConcurrentCheckpoints(1);

        //设置barrier对齐的超时时间。前提是当前是barrier对齐模式，一旦超过60s还没有对齐，那么它会自动转换为非对齐模式。
        checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(60));

        //设置job cancle后，checkpoint备份的chk-xx目录依旧保留在磁盘
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //最多容忍ck失败5次，超过5次，job停止
        checkpointConfig.setTolerableCheckpointFailureNumber(5);

        //checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ck");
        checkpointConfig.setCheckpointStorage("file:///e:/tmp");

        env.setParallelism(2);
                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new MyMapFunction())
                    .addSink(new SinkFunction<String>()
                    {
                        @Override
                        public void invoke(String value, Context context) throws Exception {
                            if (value.contains("x")){
                                //手动模拟故障
                                throw new RuntimeException("出故障了...");
                            }
                            System.out.println(value);
                        }
                    });

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }

    // 把收到的单词进行累积拼接之后再输出。
    private static class MyMapFunction implements MapFunction<String,String>, CheckpointedFunction
    {

        //Managed 状态。 flink帮你自动备份，及重启时自动恢复。用起来和list一模一样
        private ListState<String> result ;  //1
        private ListState<Integer> result1 ;
        private ListState<String> result2 ;
        private ListState<String> result3 ;
        @Override
        public String map(String value) throws Exception {
            //写状态
            result.add(value);
            //读状态
            return result.get().toString();
        }

        //周期（参考ck的配置）性备份状态。 备份是自动的，无需写任何内容
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
           // System.out.println("MyMapFunction.snapshotState");
        }

        //当Task启动或重启时，MyMapFunction对象就被创建，对象的属性result需要从之前备份的状态中恢复
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
           // System.out.println("MyMapFunction.initializeState");

            //获取到状态的备份
            OperatorStateStore operatorStateStore = context.getOperatorStateStore();
            //创建状态的描述
            ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("1", String.class);
            //从备份中根据描述获取之前备份的状态。第一次启动，此时会创建一个[]赋值给result
            result = operatorStateStore.getListState(listStateDescriptor);
        }
    }
}
